---
title: "A2A 多代理协作 | A2A 协议 | Kastrax 文档"
description: "A2A 协议的多代理协作机制，包括代理网络、协作模式和协作示例。"
---

# A2A 多代理协作 ✅

## 多代理协作概述 ✅

多代理协作是 A2A 协议的核心目标，允许多个代理协同工作，解决复杂问题。通过多代理协作，每个代理可以专注于自己的专业领域，共同完成单个代理无法完成的任务。

Kastrax 实现了全面的 A2A 多代理协作机制，包括代理网络、协作模式和协作工具，支持构建复杂的多代理系统。

## 代理网络 ✅

代理网络是多个代理的集合，它们可以相互发现、通信和协作。Kastrax 提供了代理网络的创建和管理功能：

```kotlin
// 创建代理网络
val agentNetwork = agentNetwork {
    // 添加代理
    agent(dataCollectorAgent)
    agent(dataAnalysisAgent)
    agent(reportGeneratorAgent)
    
    // 配置路由
    routing {
        // 根据能力路由
        route("collect_data") to "data-collector-agent"
        route("analyze_data") to "data-analysis-agent"
        route("generate_report") to "report-generator-agent"
        
        // 默认路由
        route("*") to "assistant-agent"
    }
    
    // 配置发现
    discovery {
        // 启用自动发现
        autoDiscovery = true
        
        // 添加已知服务器
        server("https://agent1.example.com")
        server("https://agent2.example.com")
    }
    
    // 配置安全
    security {
        // 使用 JWT 认证
        type = AuthType.JWT
        jwtSecret = "your-jwt-secret"
    }
}
```

## 协作模式 ✅

Kastrax 支持多种代理协作模式，适用于不同的场景：

### 1. 顺序协作 ✅

顺序协作是最简单的协作模式，代理按照预定义的顺序执行任务：

```kotlin
// 顺序协作
suspend fun sequentialCollaboration(marketId: String): MarketReport {
    // 1. 收集数据
    val dataRequest = InvokeRequest(
        capability = "collect_market_data",
        parameters = mapOf("market_id" to marketId)
    )
    val dataResponse = dataCollectorAgent.invoke(dataRequest)
    val marketData = dataResponse.result as MarketData

    // 2. 分析数据
    val analysisRequest = InvokeRequest(
        capability = "analyze_market_data",
        parameters = mapOf("market_data" to marketData)
    )
    val analysisResponse = dataAnalysisAgent.invoke(analysisRequest)
    val marketAnalysis = analysisResponse.result as MarketAnalysis

    // 3. 生成报告
    val reportRequest = InvokeRequest(
        capability = "generate_market_report",
        parameters = mapOf(
            "market_data" to marketData,
            "market_analysis" to marketAnalysis
        )
    )
    val reportResponse = reportGeneratorAgent.invoke(reportRequest)
    return reportResponse.result as MarketReport
}
```

### 2. 并行协作 ✅

并行协作允许多个代理同时执行任务，然后合并结果：

```kotlin
// 并行协作
suspend fun parallelCollaboration(marketId: String): MarketReport = coroutineScope {
    // 1. 并行收集和分析数据
    val dataDeferred = async {
        val dataRequest = InvokeRequest(
            capability = "collect_market_data",
            parameters = mapOf("market_id" to marketId)
        )
        val dataResponse = dataCollectorAgent.invoke(dataRequest)
        dataResponse.result as MarketData
    }
    
    val competitorDataDeferred = async {
        val competitorRequest = InvokeRequest(
            capability = "collect_competitor_data",
            parameters = mapOf("market_id" to marketId)
        )
        val competitorResponse = competitorAnalysisAgent.invoke(competitorRequest)
        competitorResponse.result as CompetitorData
    }
    
    val trendDataDeferred = async {
        val trendRequest = InvokeRequest(
            capability = "collect_trend_data",
            parameters = mapOf("market_id" to marketId)
        )
        val trendResponse = trendAnalysisAgent.invoke(trendRequest)
        trendResponse.result as TrendData
    }
    
    // 2. 等待所有数据收集完成
    val marketData = dataDeferred.await()
    val competitorData = competitorDataDeferred.await()
    val trendData = trendDataDeferred.await()
    
    // 3. 分析综合数据
    val analysisRequest = InvokeRequest(
        capability = "analyze_comprehensive_data",
        parameters = mapOf(
            "market_data" to marketData,
            "competitor_data" to competitorData,
            "trend_data" to trendData
        )
    )
    val analysisResponse = dataAnalysisAgent.invoke(analysisRequest)
    val marketAnalysis = analysisResponse.result as MarketAnalysis
    
    // 4. 生成报告
    val reportRequest = InvokeRequest(
        capability = "generate_market_report",
        parameters = mapOf(
            "market_data" to marketData,
            "competitor_data" to competitorData,
            "trend_data" to trendData,
            "market_analysis" to marketAnalysis
        )
    )
    val reportResponse = reportGeneratorAgent.invoke(reportRequest)
    reportResponse.result as MarketReport
}
```

### 3. 动态协作 ✅

动态协作允许代理根据任务需求动态选择协作伙伴：

```kotlin
// 动态协作
suspend fun dynamicCollaboration(task: Task): Result {
    // 1. 分析任务需求
    val requirementsRequest = InvokeRequest(
        capability = "analyze_task_requirements",
        parameters = mapOf("task" to task)
    )
    val requirementsResponse = taskAnalysisAgent.invoke(requirementsRequest)
    val requirements = requirementsResponse.result as TaskRequirements
    
    // 2. 发现具有所需能力的代理
    val capableAgents = mutableMapOf<String, A2AAgent>()
    for (capability in requirements.requiredCapabilities) {
        val agents = agentRegistry.findAgentsByCapability(capability)
        if (agents.isNotEmpty()) {
            // 选择最合适的代理
            val selectedAgent = selectBestAgent(agents, capability)
            capableAgents[capability] = selectedAgent
        } else {
            throw IllegalStateException("No agent found with capability: $capability")
        }
    }
    
    // 3. 创建执行计划
    val planRequest = InvokeRequest(
        capability = "create_execution_plan",
        parameters = mapOf(
            "task" to task,
            "requirements" to requirements,
            "available_agents" to capableAgents.keys
        )
    )
    val planResponse = planningAgent.invoke(planRequest)
    val executionPlan = planResponse.result as ExecutionPlan
    
    // 4. 执行计划
    return executePlan(executionPlan, capableAgents, task)
}

// 执行计划
suspend fun executePlan(
    plan: ExecutionPlan,
    agents: Map<String, A2AAgent>,
    task: Task
): Result = coroutineScope {
    // 执行计划中的步骤
    val results = mutableMapOf<String, Any>()
    
    for (step in plan.steps) {
        // 如果步骤有依赖，等待依赖完成
        for (dependency in step.dependencies) {
            if (!results.containsKey(dependency)) {
                throw IllegalStateException("Dependency not satisfied: $dependency")
            }
        }
        
        // 获取执行步骤的代理
        val agent = agents[step.capability] ?: throw IllegalStateException("No agent found for capability: ${step.capability}")
        
        // 准备参数
        val parameters = mutableMapOf<String, JsonElement>()
        for ((paramName, paramValue) in step.parameters) {
            // 如果参数值是引用，从结果中获取
            if (paramValue is String && paramValue.startsWith("$")) {
                val resultKey = paramValue.substring(1)
                parameters[paramName] = results[resultKey]?.toJsonElement() ?: throw IllegalStateException("Result not found: $resultKey")
            } else {
                parameters[paramName] = paramValue.toJsonElement()
            }
        }
        
        // 调用代理
        val request = InvokeRequest(
            capability = step.capability,
            parameters = parameters
        )
        val response = agent.invoke(request)
        
        // 存储结果
        results[step.id] = response.result
    }
    
    // 返回最终结果
    return@coroutineScope results[plan.steps.last().id] as Result
}
```

### 4. 层次协作 ✅

层次协作使用层次结构组织代理，上层代理协调下层代理：

```kotlin
// 层次协作
suspend fun hierarchicalCollaboration(task: Task): Result {
    // 1. 将任务委派给协调代理
    val coordinatorRequest = InvokeRequest(
        capability = "coordinate_task",
        parameters = mapOf("task" to task)
    )
    val coordinatorResponse = coordinatorAgent.invoke(coordinatorRequest)
    
    // 2. 协调代理将任务分解为子任务
    val subTasks = coordinatorResponse.result as List<SubTask>
    
    // 3. 为每个子任务选择合适的代理
    val subTaskResults = mutableListOf<SubTaskResult>()
    for (subTask in subTasks) {
        // 选择合适的代理
        val agent = selectAgentForSubTask(subTask)
        
        // 执行子任务
        val subTaskRequest = InvokeRequest(
            capability = subTask.capability,
            parameters = subTask.parameters
        )
        val subTaskResponse = agent.invoke(subTaskRequest)
        
        // 存储子任务结果
        subTaskResults.add(
            SubTaskResult(
                subTaskId = subTask.id,
                result = subTaskResponse.result
            )
        )
    }
    
    // 4. 协调代理合并子任务结果
    val mergeRequest = InvokeRequest(
        capability = "merge_results",
        parameters = mapOf("sub_task_results" to subTaskResults)
    )
    val mergeResponse = coordinatorAgent.invoke(mergeRequest)
    
    return mergeResponse.result as Result
}
```

### 5. 自组织协作 ✅

自组织协作允许代理自主组织协作，无需中央协调：

```kotlin
// 自组织协作
suspend fun selfOrganizingCollaboration(task: Task): Result {
    // 1. 广播任务到代理网络
    val taskBroadcastRequest = TaskBroadcastRequest(
        taskId = UUID.randomUUID().toString(),
        task = task
    )
    agentNetwork.broadcast(taskBroadcastRequest)
    
    // 2. 等待代理响应
    val responses = agentNetwork.collectResponses(taskBroadcastRequest.taskId, timeout = 30000)
    
    // 3. 代理自主协商协作方式
    val negotiationRequest = NegotiationRequest(
        taskId = taskBroadcastRequest.taskId,
        responses = responses
    )
    agentNetwork.broadcast(negotiationRequest)
    
    // 4. 等待协商结果
    val negotiationResult = agentNetwork.waitForNegotiationResult(negotiationRequest.taskId, timeout = 30000)
    
    // 5. 执行协商结果
    return executeNegotiationResult(negotiationResult)
}
```

## 协作工具 ✅

Kastrax 提供了多种协作工具，支持代理之间的协作：

### 1. 消息总线 ✅

消息总线允许代理发送和接收消息，实现异步通信：

```kotlin
// 创建消息总线
val messageBus = messageBus {
    // 配置主题
    topic("market-data") {
        // 配置消息类型
        messageType = MarketData::class
        
        // 配置订阅者
        subscribers = listOf("data-analysis-agent", "report-generator-agent")
    }
    
    topic("market-analysis") {
        messageType = MarketAnalysis::class
        subscribers = listOf("report-generator-agent")
    }
    
    // 配置安全
    security {
        // 使用 API 密钥认证
        type = AuthType.API_KEY
        apiKeys = mapOf(
            "api-key-1" to Principal("data-collector-agent", listOf("publisher"), listOf("publish")),
            "api-key-2" to Principal("data-analysis-agent", listOf("subscriber", "publisher"), listOf("subscribe", "publish")),
            "api-key-3" to Principal("report-generator-agent", listOf("subscriber"), listOf("subscribe"))
        )
    }
}

// 发布消息
messageBus.publish(
    topic = "market-data",
    message = marketData,
    publisher = "data-collector-agent"
)

// 订阅消息
messageBus.subscribe(
    topic = "market-data",
    subscriber = "data-analysis-agent"
) { message ->
    // 处理消息
    val marketData = message as MarketData
    // ...
}
```

### 2. 共享内存 ✅

共享内存允许代理共享数据，实现高效的数据交换：

```kotlin
// 创建共享内存
val sharedMemory = sharedMemory {
    // 配置区域
    region("market-data") {
        // 配置数据类型
        dataType = MarketData::class
        
        // 配置访问权限
        accessControl {
            // 读取权限
            read = listOf("data-collector-agent", "data-analysis-agent", "report-generator-agent")
            
            // 写入权限
            write = listOf("data-collector-agent")
        }
    }
    
    region("market-analysis") {
        dataType = MarketAnalysis::class
        accessControl {
            read = listOf("data-analysis-agent", "report-generator-agent")
            write = listOf("data-analysis-agent")
        }
    }
    
    // 配置安全
    security {
        // 使用 JWT 认证
        type = AuthType.JWT
        jwtSecret = "your-jwt-secret"
    }
}

// 写入数据
sharedMemory.write(
    region = "market-data",
    key = "market-123",
    value = marketData,
    writer = "data-collector-agent"
)

// 读取数据
val marketData = sharedMemory.read<MarketData>(
    region = "market-data",
    key = "market-123",
    reader = "data-analysis-agent"
)
```

### 3. 事件系统 ✅

事件系统允许代理发布和订阅事件，实现松耦合的通信：

```kotlin
// 创建事件系统
val eventSystem = eventSystem {
    // 配置事件类型
    eventType("market-data-collected") {
        // 配置事件数据类型
        dataType = MarketData::class
        
        // 配置订阅者
        subscribers = listOf("data-analysis-agent", "report-generator-agent")
    }
    
    eventType("market-analysis-completed") {
        dataType = MarketAnalysis::class
        subscribers = listOf("report-generator-agent")
    }
    
    // 配置安全
    security {
        // 使用 API 密钥认证
        type = AuthType.API_KEY
        apiKeys = mapOf(
            "api-key-1" to Principal("data-collector-agent", listOf("publisher"), listOf("publish")),
            "api-key-2" to Principal("data-analysis-agent", listOf("subscriber", "publisher"), listOf("subscribe", "publish")),
            "api-key-3" to Principal("report-generator-agent", listOf("subscriber"), listOf("subscribe"))
        )
    }
}

// 发布事件
eventSystem.publish(
    eventType = "market-data-collected",
    data = marketData,
    publisher = "data-collector-agent"
)

// 订阅事件
eventSystem.subscribe(
    eventType = "market-data-collected",
    subscriber = "data-analysis-agent"
) { event ->
    // 处理事件
    val marketData = event.data as MarketData
    // ...
}
```

### 4. 协作空间 ✅

协作空间是一个虚拟环境，允许多个代理共享状态和协作：

```kotlin
// 创建协作空间
val collaborationSpace = collaborationSpace {
    // 配置空间 ID
    id = "market-analysis-space"
    
    // 配置参与者
    participants = listOf("data-collector-agent", "data-analysis-agent", "report-generator-agent")
    
    // 配置共享状态
    sharedState {
        // 市场数据
        state("market-data") {
            dataType = MarketData::class
            accessControl {
                read = listOf("data-collector-agent", "data-analysis-agent", "report-generator-agent")
                write = listOf("data-collector-agent")
            }
        }
        
        // 市场分析
        state("market-analysis") {
            dataType = MarketAnalysis::class
            accessControl {
                read = listOf("data-analysis-agent", "report-generator-agent")
                write = listOf("data-analysis-agent")
            }
        }
        
        // 市场报告
        state("market-report") {
            dataType = MarketReport::class
            accessControl {
                read = listOf("report-generator-agent")
                write = listOf("report-generator-agent")
            }
        }
    }
    
    // 配置协作工具
    tools {
        // 消息工具
        tool("send-message") {
            description = "发送消息给其他参与者"
            parameters {
                parameter("recipient", "接收者", String::class)
                parameter("message", "消息内容", String::class)
            }
            execute { params ->
                val recipient = params["recipient"] as String
                val message = params["message"] as String
                sendMessage(recipient, message)
                "消息已发送"
            }
        }
        
        // 状态更新工具
        tool("update-state") {
            description = "更新共享状态"
            parameters {
                parameter("state-key", "状态键", String::class)
                parameter("state-value", "状态值", Any::class)
            }
            execute { params ->
                val stateKey = params["state-key"] as String
                val stateValue = params["state-value"]
                updateState(stateKey, stateValue)
                "状态已更新"
            }
        }
    }
    
    // 配置安全
    security {
        // 使用 JWT 认证
        type = AuthType.JWT
        jwtSecret = "your-jwt-secret"
    }
}

// 加入协作空间
val participant = collaborationSpace.join(
    participantId = "data-analysis-agent",
    credentials = JwtCredentials("your-jwt-token")
)

// 获取共享状态
val marketData = participant.getState<MarketData>("market-data")

// 更新共享状态
participant.updateState("market-analysis", marketAnalysis)

// 发送消息
participant.sendMessage(
    recipient = "report-generator-agent",
    message = "市场分析已完成，可以生成报告了"
)

// 接收消息
participant.onMessage { message ->
    println("收到消息：${message.content}")
}

// 使用工具
participant.useTool(
    toolId = "update-state",
    parameters = mapOf(
        "state-key" to "market-analysis",
        "state-value" to marketAnalysis
    )
)
```

## 协作示例 ✅

以下是一个完整的多代理协作示例，展示了如何使用 Kastrax 的 A2A 实现构建复杂的多代理系统：

```kotlin
// 创建 A2A 实例
val a2aInstance = a2a {
    // 配置代理网络
    agentNetwork {
        // 注册数据收集代理
        a2aAgent {
            id = "data-collector-agent"
            name = "数据收集代理"
            description = "收集市场数据的代理"
            
            // 添加能力
            capability {
                id = "collect_market_data"
                name = "收集市场数据"
                description = "收集指定市场的数据"
                
                // 添加参数
                parameter {
                    name = "market_id"
                    type = "string"
                    description = "市场ID"
                    required = true
                }
                
                returnType = "json"
            }
            
            // 配置认证
            authentication {
                type = AuthType.API_KEY
            }
        }
        
        // 注册数据分析代理
        a2aAgent {
            id = "data-analysis-agent"
            name = "数据分析代理"
            description = "分析市场数据的代理"
            
            // 添加能力
            capability {
                id = "analyze_market_data"
                name = "分析市场数据"
                description = "分析市场数据并生成分析结果"
                
                // 添加参数
                parameter {
                    name = "market_data"
                    type = "json"
                    description = "市场数据"
                    required = true
                }
                
                returnType = "json"
            }
            
            // 配置认证
            authentication {
                type = AuthType.API_KEY
            }
        }
        
        // 注册报告生成代理
        a2aAgent {
            id = "report-generator-agent"
            name = "报告生成代理"
            description = "生成市场报告的代理"
            
            // 添加能力
            capability {
                id = "generate_market_report"
                name = "生成市场报告"
                description = "根据市场数据和分析结果生成市场报告"
                
                // 添加参数
                parameter {
                    name = "market_data"
                    type = "json"
                    description = "市场数据"
                    required = true
                }
                
                parameter {
                    name = "market_analysis"
                    type = "json"
                    description = "市场分析结果"
                    required = true
                }
                
                returnType = "json"
            }
            
            // 配置认证
            authentication {
                type = AuthType.API_KEY
            }
        }
        
        // 配置协作工具
        collaborationTools {
            // 配置消息总线
            messageBus {
                // 配置主题
                topic("market-data") {
                    messageType = MarketData::class
                    subscribers = listOf("data-analysis-agent", "report-generator-agent")
                }
                
                topic("market-analysis") {
                    messageType = MarketAnalysis::class
                    subscribers = listOf("report-generator-agent")
                }
            }
            
            // 配置共享内存
            sharedMemory {
                // 配置区域
                region("market-data") {
                    dataType = MarketData::class
                    accessControl {
                        read = listOf("data-collector-agent", "data-analysis-agent", "report-generator-agent")
                        write = listOf("data-collector-agent")
                    }
                }
                
                region("market-analysis") {
                    dataType = MarketAnalysis::class
                    accessControl {
                        read = listOf("data-analysis-agent", "report-generator-agent")
                        write = listOf("data-analysis-agent")
                    }
                }
            }
        }
        
        // 配置工作流
        workflow {
            id = "market-report-workflow"
            name = "市场报告工作流"
            description = "生成市场报告的工作流"
            
            // 输入参数
            input {
                parameter("market_id", "市场ID", String::class)
            }
            
            // 工作流步骤
            steps {
                // 步骤 1：收集数据
                step("collect_data") {
                    agentId = "data-collector-agent"
                    capabilityId = "collect_market_data"
                    parameters {
                        parameter("market_id", context.get<String>("market_id"))
                    }
                    output = "market_data"
                }
                
                // 步骤 2：分析数据
                step("analyze_data") {
                    agentId = "data-analysis-agent"
                    capabilityId = "analyze_market_data"
                    parameters {
                        parameter("market_data", context.get<MarketData>("market_data"))
                    }
                    output = "market_analysis"
                    dependsOn = listOf("collect_data")
                }
                
                // 步骤 3：生成报告
                step("generate_report") {
                    agentId = "report-generator-agent"
                    capabilityId = "generate_market_report"
                    parameters {
                        parameter("market_data", context.get<MarketData>("market_data"))
                        parameter("market_analysis", context.get<MarketAnalysis>("market_analysis"))
                    }
                    output = "market_report"
                    dependsOn = listOf("analyze_data")
                }
            }
            
            // 输出参数
            output {
                parameter("market_report", "市场报告", MarketReport::class)
            }
        }
    }
    
    // 配置服务器
    server {
        port = 8080
        enableCors = true
    }
    
    // 配置安全
    security {
        // 使用 API 密钥认证
        type = AuthType.API_KEY
        apiKeys = mapOf(
            "api-key-1" to Principal("data-collector-agent", listOf("agent"), listOf("invoke")),
            "api-key-2" to Principal("data-analysis-agent", listOf("agent"), listOf("invoke")),
            "api-key-3" to Principal("report-generator-agent", listOf("agent"), listOf("invoke"))
        )
    }
}

// 启动 A2A 实例
a2aInstance.start()

// 获取工作流引擎
val workflowEngine = a2aInstance.getWorkflowEngine()

// 执行工作流
val result = workflowEngine.execute(
    workflowId = "market-report-workflow",
    input = mapOf(
        "market_id" to "MARKET-123"
    )
)

// 获取市场报告
val marketReport = result.get<MarketReport>("market_report")
println("市场报告：$marketReport")

// 停止 A2A 实例
a2aInstance.stop()
```

## 总结 ✅

Kastrax 的 A2A 实现提供了全面的多代理协作机制，包括代理网络、协作模式和协作工具，支持构建复杂的多代理系统。通过多代理协作，每个代理可以专注于自己的专业领域，共同完成单个代理无法完成的任务。

## 下一步 ✅

了解了 A2A 多代理协作后，您可以：

1. 学习 [A2A 代理发现](/docs/a2a/a2a-discovery.mdx)
2. 了解 [A2A 任务委派](/docs/a2a/a2a-delegation.mdx)
3. 研究 [A2A 安全机制](/docs/a2a/a2a-security.mdx)
4. 学习 [工作流引擎](/docs/a2a/a2a-workflow.mdx)
5. 研究 [A2A 协议规范](/docs/a2a/a2a-specification.mdx)
